sqlalchemy orm expert

安装量: 44
排名: #16863

安装

npx skills add https://github.com/manutej/luxor-claude-marketplace --skill 'SQLAlchemy ORM Expert'

SQLAlchemy ORM Expert Skill Overview This skill provides comprehensive guidance for using SQLAlchemy 2.0+ in customer support systems, focusing on ORM patterns, session management, query optimization, async operations with FastAPI, and PostgreSQL integration. It covers everything from basic model definitions to advanced patterns for high-performance support applications. Core Competencies 1. Customer Support Data Models When building customer support systems, you need robust data models that represent tickets, users, comments, attachments, and their relationships. SQLAlchemy's declarative mapping with type hints provides a clean, modern approach. Base Model Setup: from datetime import datetime from typing import Optional , List from sqlalchemy import String , Integer , DateTime , Text , ForeignKey , Enum , Boolean from sqlalchemy . orm import DeclarativeBase , Mapped , mapped_column , relationship from sqlalchemy . sql import func import enum class Base ( DeclarativeBase ) : """Base class for all ORM models""" pass class TicketStatus ( enum . Enum ) : """Ticket status enumeration""" OPEN = "open" IN_PROGRESS = "in_progress" WAITING_ON_CUSTOMER = "waiting_on_customer" RESOLVED = "resolved" CLOSED = "closed" class TicketPriority ( enum . Enum ) : """Ticket priority levels""" LOW = "low" MEDIUM = "medium" HIGH = "high" URGENT = "urgent" User Model: class User ( Base ) : tablename = "users" id : Mapped [ int ] = mapped_column ( primary_key = True ) email : Mapped [ str ] = mapped_column ( String ( 255 ) , unique = True , nullable = False , index = True ) full_name : Mapped [ str ] = mapped_column ( String ( 255 ) , nullable = False ) is_active : Mapped [ bool ] = mapped_column ( Boolean , default = True , nullable = False ) is_staff : Mapped [ bool ] = mapped_column ( Boolean , default = False , nullable = False ) password_hash : Mapped [ str ] = mapped_column ( String ( 255 ) , nullable = False )

Timestamps

created_at : Mapped [ datetime ] = mapped_column ( DateTime ( timezone = True ) , server_default = func . now ( ) , nullable = False ) updated_at : Mapped [ datetime ] = mapped_column ( DateTime ( timezone = True ) , server_default = func . now ( ) , onupdate = func . now ( ) , nullable = False ) last_login : Mapped [ Optional [ datetime ] ] = mapped_column ( DateTime ( timezone = True ) )

Relationships

tickets_created : Mapped [ List [ "Ticket" ] ] = relationship ( "Ticket" , back_populates = "creator" , foreign_keys = "Ticket.creator_id" , cascade = "all, delete-orphan" ) tickets_assigned : Mapped [ List [ "Ticket" ] ] = relationship ( "Ticket" , back_populates = "assignee" , foreign_keys = "Ticket.assignee_id" ) comments : Mapped [ List [ "Comment" ] ] = relationship ( "Comment" , back_populates = "author" , cascade = "all, delete-orphan" ) def repr ( self ) -

str : return f"" Ticket Model: class Ticket ( Base ) : tablename = "tickets" id : Mapped [ int ] = mapped_column ( primary_key = True ) ticket_number : Mapped [ str ] = mapped_column ( String ( 50 ) , unique = True , nullable = False , index = True ) title : Mapped [ str ] = mapped_column ( String ( 500 ) , nullable = False ) description : Mapped [ str ] = mapped_column ( Text , nullable = False )

Status and priority

status : Mapped [ TicketStatus ] = mapped_column ( Enum ( TicketStatus ) , default = TicketStatus . OPEN , nullable = False , index = True ) priority : Mapped [ TicketPriority ] = mapped_column ( Enum ( TicketPriority ) , default = TicketPriority . MEDIUM , nullable = False , index = True )

Foreign keys

creator_id : Mapped [ int ] = mapped_column ( ForeignKey ( "users.id" ) , nullable = False , index = True ) assignee_id : Mapped [ Optional [ int ] ] = mapped_column ( ForeignKey ( "users.id" ) , index = True )

Soft delete

deleted_at : Mapped [ Optional [ datetime ] ] = mapped_column ( DateTime ( timezone = True ) )

Timestamps

created_at : Mapped [ datetime ] = mapped_column ( DateTime ( timezone = True ) , server_default = func . now ( ) , nullable = False , index = True ) updated_at : Mapped [ datetime ] = mapped_column ( DateTime ( timezone = True ) , server_default = func . now ( ) , onupdate = func . now ( ) , nullable = False ) resolved_at : Mapped [ Optional [ datetime ] ] = mapped_column ( DateTime ( timezone = True ) )

Relationships

creator : Mapped [ "User" ] = relationship ( "User" , back_populates = "tickets_created" , foreign_keys = [ creator_id ] ) assignee : Mapped [ Optional [ "User" ] ] = relationship ( "User" , back_populates = "tickets_assigned" , foreign_keys = [ assignee_id ] ) comments : Mapped [ List [ "Comment" ] ] = relationship ( "Comment" , back_populates = "ticket" , cascade = "all, delete-orphan" , order_by = "Comment.created_at" ) attachments : Mapped [ List [ "Attachment" ] ] = relationship ( "Attachment" , back_populates = "ticket" , cascade = "all, delete-orphan" ) tags : Mapped [ List [ "Tag" ] ] = relationship ( "Tag" , secondary = "ticket_tags" , back_populates = "tickets" ) def repr ( self ) -

str : return f"" 2. Relationship Patterns One-to-Many (Comments on Tickets): class Comment ( Base ) : tablename = "comments" id : Mapped [ int ] = mapped_column ( primary_key = True ) content : Mapped [ str ] = mapped_column ( Text , nullable = False ) is_internal : Mapped [ bool ] = mapped_column ( Boolean , default = False , nullable = False )

Foreign keys

ticket_id : Mapped [ int ] = mapped_column ( ForeignKey ( "tickets.id" ) , nullable = False , index = True ) author_id : Mapped [ int ] = mapped_column ( ForeignKey ( "users.id" ) , nullable = False , index = True )

Timestamps

created_at : Mapped [ datetime ] = mapped_column ( DateTime ( timezone = True ) , server_default = func . now ( ) , nullable = False ) updated_at : Mapped [ datetime ] = mapped_column ( DateTime ( timezone = True ) , server_default = func . now ( ) , onupdate = func . now ( ) , nullable = False )

Relationships

ticket : Mapped [ "Ticket" ] = relationship ( "Ticket" , back_populates = "comments" ) author : Mapped [ "User" ] = relationship ( "User" , back_populates = "comments" ) def repr ( self ) -

str : return f"" Many-to-Many (Tags on Tickets): from sqlalchemy import Table , Column

Association table for many-to-many relationship

ticket_tags

Table ( "ticket_tags" , Base . metadata , Column ( "ticket_id" , ForeignKey ( "tickets.id" ) , primary_key = True ) , Column ( "tag_id" , ForeignKey ( "tags.id" ) , primary_key = True ) , Column ( "created_at" , DateTime ( timezone = True ) , server_default = func . now ( ) ) ) class Tag ( Base ) : tablename = "tags" id : Mapped [ int ] = mapped_column ( primary_key = True ) name : Mapped [ str ] = mapped_column ( String ( 100 ) , unique = True , nullable = False , index = True ) color : Mapped [ str ] = mapped_column ( String ( 7 ) , nullable = False )

Hex color

description : Mapped [ Optional [ str ] ] = mapped_column ( String ( 500 ) )

Timestamps

created_at : Mapped [ datetime ] = mapped_column ( DateTime ( timezone = True ) , server_default = func . now ( ) , nullable = False )

Relationships

tickets : Mapped [ List [ "Ticket" ] ] = relationship ( "Ticket" , secondary = ticket_tags , back_populates = "tags" ) def repr ( self ) -

str : return f"" 3. Session Management Synchronous Session Setup: from sqlalchemy import create_engine from sqlalchemy . orm import sessionmaker , Session from contextlib import contextmanager

Database URL

DATABASE_URL

"postgresql://user:password@localhost:5432/support_db"

Create engine with connection pooling

engine

create_engine ( DATABASE_URL , pool_pre_ping = True ,

Verify connections before using

pool_size

10 ,

Number of connections to maintain

max_overflow

20 ,

Additional connections when pool is full

echo

False ,

Set to True for SQL logging

)

Create session factory

SessionLocal

sessionmaker ( bind = engine , autocommit = False , autoflush = False , expire_on_commit = False

Don't expire objects after commit

) @contextmanager def get_db_session ( ) -

Session : """Context manager for database sessions""" session = SessionLocal ( ) try : yield session session . commit ( ) except Exception : session . rollback ( ) raise finally : session . close ( ) Async Session Setup for FastAPI: from sqlalchemy . ext . asyncio import create_async_engine , AsyncSession , async_sessionmaker

Async database URL (using asyncpg)

ASYNC_DATABASE_URL

"postgresql+asyncpg://user:password@localhost:5432/support_db"

Create async engine

async_engine

create_async_engine ( ASYNC_DATABASE_URL , pool_pre_ping = True , pool_size = 10 , max_overflow = 20 , echo = False , )

Create async session factory

AsyncSessionLocal

async_sessionmaker ( bind = async_engine , class_ = AsyncSession , autocommit = False , autoflush = False , expire_on_commit = False ) async def get_async_db ( ) -

AsyncSession : """Dependency for FastAPI to get async database session""" async with AsyncSessionLocal ( ) as session : try : yield session await session . commit ( ) except Exception : await session . rollback ( ) raise 4. Query Optimization and Eager Loading Avoiding N+1 Queries with Joined Load: from sqlalchemy import select from sqlalchemy . orm import joinedload , selectinload async def get_tickets_with_details ( session : AsyncSession , status : Optional [ TicketStatus ] = None ) -

List [ Ticket ] : """ Fetch tickets with all related data in optimized queries. Uses joinedload for single-row relationships and selectinload for collections. """ stmt = ( select ( Ticket ) . options ( joinedload ( Ticket . creator ) ,

One-to-one/many-to-one: use joinedload

joinedload ( Ticket . assignee ) , selectinload ( Ticket . comments ) . joinedload ( Comment . author ) ,

Collections: use selectinload

selectinload ( Ticket . attachments ) , selectinload ( Ticket . tags ) ) . where ( Ticket . deleted_at . is_ ( None ) )

Soft delete filter

) if status : stmt = stmt . where ( Ticket . status == status ) stmt = stmt . order_by ( Ticket . created_at . desc ( ) ) result = await session . execute ( stmt ) return list ( result . unique ( ) . scalars ( ) . all ( ) ) Select In Load for Better Performance: async def get_user_with_tickets ( session : AsyncSession , user_id : int ) -

Optional [ User ] : """ Fetch user with all tickets using selectinload for better performance on large collections. """ stmt = ( select ( User ) . options ( selectinload ( User . tickets_created ) . selectinload ( Ticket . comments ) , selectinload ( User . tickets_assigned ) ) . where ( User . id == user_id ) ) result = await session . execute ( stmt ) return result . unique ( ) . scalar_one_or_none ( ) 5. Complex Queries and Filters Advanced Filtering: from sqlalchemy import and_ , or_ , not_ , func , case from datetime import timedelta async def search_tickets ( session : AsyncSession , search_term : Optional [ str ] = None , status_list : Optional [ List [ TicketStatus ] ] = None , priority : Optional [ TicketPriority ] = None , assignee_id : Optional [ int ] = None , created_after : Optional [ datetime ] = None , tags : Optional [ List [ str ] ] = None , limit : int = 50 , offset : int = 0 ) -

tuple [ List [ Ticket ] , int ] : """ Advanced ticket search with multiple filters. Returns tickets and total count. """

Base query

stmt

( select ( Ticket ) . options ( joinedload ( Ticket . creator ) , joinedload ( Ticket . assignee ) , selectinload ( Ticket . tags ) ) . where ( Ticket . deleted_at . is_ ( None ) ) )

Apply filters

if search_term : search_filter = or_ ( Ticket . title . ilike ( f"% { search_term } %" ) , Ticket . description . ilike ( f"% { search_term } %" ) , Ticket . ticket_number . ilike ( f"% { search_term } %" ) ) stmt = stmt . where ( search_filter ) if status_list : stmt = stmt . where ( Ticket . status . in_ ( status_list ) ) if priority : stmt = stmt . where ( Ticket . priority == priority ) if assignee_id : stmt = stmt . where ( Ticket . assignee_id == assignee_id ) if created_after : stmt = stmt . where ( Ticket . created_at

= created_after ) if tags : stmt = stmt . join ( Ticket . tags ) . where ( Tag . name . in_ ( tags ) )

Count query

count_stmt

select ( func . count ( ) ) . select_from ( stmt . subquery ( ) ) count_result = await session . execute ( count_stmt ) total = count_result . scalar_one ( )

Apply ordering and pagination

stmt

stmt . order_by ( Ticket . created_at . desc ( ) ) . limit ( limit ) . offset ( offset ) result = await session . execute ( stmt ) tickets = list ( result . unique ( ) . scalars ( ) . all ( ) ) return tickets , total 6. Aggregation Queries for Analytics Ticket Statistics: from sqlalchemy import func , case , extract , literal_column from typing import Dict , Any async def get_ticket_statistics ( session : AsyncSession , start_date : Optional [ datetime ] = None , end_date : Optional [ datetime ] = None ) -

Dict [ str , Any ] : """ Get comprehensive ticket statistics for analytics dashboard. """

Base filter

base_filter

Ticket . deleted_at . is_ ( None ) if start_date : base_filter = and_ ( base_filter , Ticket . created_at

= start_date ) if end_date : base_filter = and_ ( base_filter , Ticket . created_at <= end_date )

Count by status

status_stmt

( select ( Ticket . status , func . count ( Ticket . id ) . label ( "count" ) ) . where ( base_filter ) . group_by ( Ticket . status ) ) status_result = await session . execute ( status_stmt ) status_counts = { row [ 0 ] . value : row [ 1 ] for row in status_result }

Count by priority

priority_stmt

( select ( Ticket . priority , func . count ( Ticket . id ) . label ( "count" ) ) . where ( base_filter ) . group_by ( Ticket . priority ) ) priority_result = await session . execute ( priority_stmt ) priority_counts = { row [ 0 ] . value : row [ 1 ] for row in priority_result }

Average resolution time

resolution_stmt

( select ( func . avg ( func . extract ( "epoch" , Ticket . resolved_at - Ticket . created_at ) ) . label ( "avg_seconds" ) ) . where ( and_ ( base_filter , Ticket . resolved_at . is_not ( None ) ) ) ) resolution_result = await session . execute ( resolution_stmt ) avg_resolution_seconds = resolution_result . scalar_one ( ) or 0

Tickets per assignee

assignee_stmt

( select ( User . full_name , func . count ( Ticket . id ) . label ( "ticket_count" ) , func . avg ( case ( ( Ticket . resolved_at . is_not ( None ) , func . extract ( "epoch" , Ticket . resolved_at - Ticket . created_at ) ) ) ) . label ( "avg_resolution_time" ) ) . join ( Ticket . assignee ) . where ( base_filter ) . group_by ( User . id , User . full_name ) . order_by ( func . count ( Ticket . id ) . desc ( ) ) ) assignee_result = await session . execute ( assignee_stmt ) assignee_stats = [ { "assignee" : row [ 0 ] , "ticket_count" : row [ 1 ] , "avg_resolution_hours" : ( row [ 2 ] / 3600 ) if row [ 2 ] else None } for row in assignee_result ] return { "status_counts" : status_counts , "priority_counts" : priority_counts , "avg_resolution_hours" : avg_resolution_seconds / 3600 , "assignee_stats" : assignee_stats } 7. Bulk Operations for Data Curation Bulk Insert: async def bulk_create_tickets ( session : AsyncSession , tickets_data : List [ Dict [ str , Any ] ] ) -

List [ Ticket ] : """ Efficiently create multiple tickets in a single transaction. """ tickets = [ Ticket ( ** data ) for data in tickets_data ] session . add_all ( tickets ) await session . flush ( )

Flush to get IDs without committing

return tickets async def bulk_insert_with_return ( session : AsyncSession , tickets_data : List [ Dict [ str , Any ] ] ) -

List [ Ticket ] : """ Bulk insert with RETURNING clause for PostgreSQL. """ from sqlalchemy . dialects . postgresql import insert stmt = insert ( Ticket ) . returning ( Ticket ) result = await session . execute ( stmt , tickets_data ) tickets = list ( result . scalars ( ) . all ( ) ) return tickets Bulk Update: from sqlalchemy import update async def bulk_update_ticket_status ( session : AsyncSession , ticket_ids : List [ int ] , new_status : TicketStatus ) -

int : """ Update status for multiple tickets efficiently. Returns number of updated rows. """ stmt = ( update ( Ticket ) . where ( and_ ( Ticket . id . in_ ( ticket_ids ) , Ticket . deleted_at . is_ ( None ) ) ) . values ( status = new_status , updated_at = func . now ( ) ) ) result = await session . execute ( stmt ) await session . commit ( ) return result . rowcount async def bulk_assign_tickets ( session : AsyncSession , ticket_ids : List [ int ] , assignee_id : int ) -

int : """ Bulk assign tickets to a user. """ stmt = ( update ( Ticket ) . where ( Ticket . id . in_ ( ticket_ids ) ) . values ( assignee_id = assignee_id , status = TicketStatus . IN_PROGRESS , updated_at = func . now ( ) ) ) result = await session . execute ( stmt ) await session . commit ( ) return result . rowcount 8. Soft Deletes and Audit Trails Soft Delete Implementation: async def soft_delete_ticket ( session : AsyncSession , ticket_id : int ) -

bool : """ Soft delete a ticket by setting deleted_at timestamp. """ stmt = ( update ( Ticket ) . where ( and_ ( Ticket . id == ticket_id , Ticket . deleted_at . is_ ( None ) ) ) . values ( deleted_at = func . now ( ) ) ) result = await session . execute ( stmt ) await session . commit ( ) return result . rowcount

0 async def restore_ticket ( session : AsyncSession , ticket_id : int ) -

bool : """ Restore a soft-deleted ticket. """ stmt = ( update ( Ticket ) . where ( Ticket . id == ticket_id ) . values ( deleted_at = None ) ) result = await session . execute ( stmt ) await session . commit ( ) return result . rowcount

0 Audit Trail Model: class AuditLog ( Base ) : tablename = "audit_logs" id : Mapped [ int ] = mapped_column ( primary_key = True ) table_name : Mapped [ str ] = mapped_column ( String ( 100 ) , nullable = False , index = True ) record_id : Mapped [ int ] = mapped_column ( Integer , nullable = False , index = True ) action : Mapped [ str ] = mapped_column ( String ( 50 ) , nullable = False )

CREATE, UPDATE, DELETE

user_id : Mapped [ Optional [ int ] ] = mapped_column ( ForeignKey ( "users.id" ) , index = True ) changes : Mapped [ Optional [ Dict ] ] = mapped_column ( JSON )

Store before/after values

created_at : Mapped [ datetime ] = mapped_column ( DateTime ( timezone = True ) , server_default = func . now ( ) , nullable = False , index = True ) user : Mapped [ Optional [ "User" ] ] = relationship ( "User" ) 9. Event Listeners for Automation Automatic Audit Logging: from sqlalchemy import event from sqlalchemy . orm import Session @event . listens_for ( Ticket , "after_insert" ) def log_ticket_created ( mapper , connection , target ) : """Automatically log ticket creation""" audit_log = AuditLog ( table_name = "tickets" , record_id = target . id , action = "CREATE" , changes = { "ticket_number" : target . ticket_number , "status" : target . status . value } ) session = Session ( bind = connection ) session . add ( audit_log ) @event . listens_for ( Ticket , "after_update" ) def log_ticket_updated ( mapper , connection , target ) : """Automatically log ticket updates""" changes = { } for attr in [ "status" , "priority" , "assignee_id" ] : hist = getattr ( mapper . get_property ( attr ) , "impl" ) . get_history ( target , mapper ) if hist . has_changes ( ) : changes [ attr ] = { "old" : hist . deleted [ 0 ] if hist . deleted else None , "new" : hist . added [ 0 ] if hist . added else None } if changes : audit_log = AuditLog ( table_name = "tickets" , record_id = target . id , action = "UPDATE" , changes = changes ) session = Session ( bind = connection ) session . add ( audit_log ) 10. Hybrid Properties and Expressions Computed Properties: from sqlalchemy . ext . hybrid import hybrid_property , hybrid_method class Ticket ( Base ) :

... existing fields ...

@hybrid_property def is_overdue ( self ) -

bool : """Check if ticket is overdue (open for more than 7 days)""" if self . status in [ TicketStatus . RESOLVED , TicketStatus . CLOSED ] : return False return ( datetime . utcnow ( ) - self . created_at ) . days

7 @is_overdue . expression def is_overdue ( cls ) : """SQL expression for is_overdue""" return and_ ( cls . status . notin_ ( [ TicketStatus . RESOLVED , TicketStatus . CLOSED ] ) , func . date_part ( "day" , func . now ( ) - cls . created_at )

7 ) @hybrid_property def response_time_hours ( self ) -

Optional [ float ] : """Time to first comment in hours""" if not self . comments : return None first_comment = min ( self . comments , key = lambda c : c . created_at ) delta = first_comment . created_at - self . created_at return delta . total_seconds ( ) / 3600 @hybrid_method def is_priority_escalation_needed ( self , hours : int = 24 ) -

bool : """Check if priority escalation is needed""" if self . status == TicketStatus . OPEN : age_hours = ( datetime . utcnow ( ) - self . created_at ) . total_seconds ( ) / 3600 return age_hours

hours return False 11. Connection Pooling and Performance Optimized Engine Configuration: from sqlalchemy . pool import QueuePool

Production-grade engine configuration

production_engine

create_async_engine ( ASYNC_DATABASE_URL , poolclass = QueuePool , pool_size = 20 ,

Maintain 20 connections

max_overflow

40 ,

Allow 40 additional connections

pool_timeout

30 ,

Wait 30 seconds for connection

pool_recycle

3600 ,

Recycle connections after 1 hour

pool_pre_ping

True ,

Verify connection health

echo_pool

False ,

Disable pool logging in production

echo

False ,

Disable SQL logging in production

connect_args

{ "server_settings" : { "jit" : "off" } ,

Disable JIT for PostgreSQL

"command_timeout" : 60 , "timeout" : 30 , } ) 12. Testing with Pytest Pytest Fixtures: import pytest from sqlalchemy . ext . asyncio import create_async_engine , AsyncSession , async_sessionmaker @pytest . fixture ( scope = "session" ) def test_database_url ( ) : """Test database URL""" return "postgresql+asyncpg://test_user:test_pass@localhost:5432/test_support_db" @pytest . fixture ( scope = "session" ) async def async_engine ( test_database_url ) : """Create test engine""" engine = create_async_engine ( test_database_url , echo = True ) async with engine . begin ( ) as conn : await conn . run_sync ( Base . metadata . create_all ) yield engine async with engine . begin ( ) as conn : await conn . run_sync ( Base . metadata . drop_all ) await engine . dispose ( ) @pytest . fixture async def async_session ( async_engine ) : """Create test session""" async_session_factory = async_sessionmaker ( bind = async_engine , class_ = AsyncSession , expire_on_commit = False ) async with async_session_factory ( ) as session : yield session await session . rollback ( ) @pytest . fixture async def test_user ( async_session ) : """Create test user""" user = User ( email = "test@example.com" , full_name = "Test User" , password_hash = "hashed_password" , is_active = True ) async_session . add ( user ) await async_session . commit ( ) await async_session . refresh ( user ) return user 13. Alembic Migrations Migration Setup:

Initialize Alembic

alembic init alembic

Create migration

alembic revision --autogenerate -m "Create support tables"

Apply migration

alembic upgrade head

Rollback migration

alembic downgrade -1 Migration Template: """Create support tables Revision ID: 001 Create Date: 2025-01-15 10:00:00 """ from typing import Sequence , Union from alembic import op import sqlalchemy as sa from sqlalchemy . dialects import postgresql revision : str = '001' down_revision : Union [ str , None ] = None branch_labels : Union [ str , Sequence [ str ] , None ] = None depends_on : Union [ str , Sequence [ str ] , None ] = None def upgrade ( ) -

None :

Create users table

op
.
create_table
(
'users'
,
sa
.
Column
(
'id'
,
sa
.
Integer
(
)
,
nullable
=
False
)
,
sa
.
Column
(
'email'
,
sa
.
String
(
length
=
255
)
,
nullable
=
False
)
,
sa
.
Column
(
'full_name'
,
sa
.
String
(
length
=
255
)
,
nullable
=
False
)
,
sa
.
Column
(
'is_active'
,
sa
.
Boolean
(
)
,
nullable
=
False
)
,
sa
.
Column
(
'is_staff'
,
sa
.
Boolean
(
)
,
nullable
=
False
)
,
sa
.
Column
(
'password_hash'
,
sa
.
String
(
length
=
255
)
,
nullable
=
False
)
,
sa
.
Column
(
'created_at'
,
sa
.
DateTime
(
timezone
=
True
)
,
server_default
=
sa
.
text
(
'now()'
)
,
nullable
=
False
)
,
sa
.
Column
(
'updated_at'
,
sa
.
DateTime
(
timezone
=
True
)
,
server_default
=
sa
.
text
(
'now()'
)
,
nullable
=
False
)
,
sa
.
Column
(
'last_login'
,
sa
.
DateTime
(
timezone
=
True
)
,
nullable
=
True
)
,
sa
.
PrimaryKeyConstraint
(
'id'
)
,
sa
.
UniqueConstraint
(
'email'
)
)
op
.
create_index
(
op
.
f
(
'ix_users_email'
)
,
'users'
,
[
'email'
]
)
def
downgrade
(
)
-
>
None
:
op
.
drop_index
(
op
.
f
(
'ix_users_email'
)
,
table_name
=
'users'
)
op
.
drop_table
(
'users'
)
Best Practices
Always use type hints
with Mapped[] for better IDE support and validation
Use indexes
on foreign keys and frequently queried columns
Implement soft deletes
for data recovery and audit compliance
Use selectinload
for collections to avoid N+1 queries
Use joinedload
for single-row relationships (many-to-one)
Enable pool_pre_ping
to handle stale connections
Set expire_on_commit=False
when using async sessions
Use transactions
properly with proper rollback handling
Implement audit logging
for compliance and debugging
Write comprehensive tests
with isolated test databases
Common Pitfalls
N+1 Query Problem
Always use eager loading for relationships
Missing Indexes
Add indexes on foreign keys and filter columns
Not Closing Sessions
Use context managers or FastAPI dependencies
Lazy Loading in Async
Will fail - always eager load or use async methods
Mixing Sync and Async
Never mix sync and async sessions
Not Using Transactions
Always wrap related operations in transactions
Forgetting pool_pre_ping
Results in stale connection errors
Not Setting Timezones
Always use timezone-aware DateTime
Inefficient Bulk Operations
Use bulk_insert_mappings for large datasets
Not Testing Query Performance
Always use EXPLAIN ANALYZE Performance Tuning Use EXPLAIN ANALYZE to understand query execution plans Add composite indexes for common filter combinations Use partial indexes for filtered queries Implement query result caching with Redis Use read replicas for analytics queries Partition large tables by date ranges Use connection pooling appropriately for your workload Monitor slow queries with pg_stat_statements Use materialized views for complex analytics Implement query timeout to prevent long-running queries Security Considerations Never expose raw database errors to end users Use parameterized queries (SQLAlchemy does this automatically) Validate user input before database operations Implement row-level security in PostgreSQL Use SSL connections in production Rotate database credentials regularly Limit database user permissions to minimum required Implement rate limiting on expensive queries Log all data access for audit trails Encrypt sensitive data at rest and in transit Additional Resources SQLAlchemy 2.0 Documentation: https://docs.sqlalchemy.org/en/21/ Alembic Documentation: https://alembic.sqlalchemy.org/ PostgreSQL Documentation: https://www.postgresql.org/docs/ FastAPI with SQLAlchemy: https://fastapi.tiangolo.com/tutorial/sql-databases/ AsyncPG Documentation: https://magicstack.github.io/asyncpg/ This skill provides comprehensive guidance for building production-ready customer support systems with SQLAlchemy. Follow these patterns for maintainable, performant, and scalable applications.
返回排行榜